29.2 核心组件实现

18 分钟阅读

29.2.1 LLM 客户端#

LLM 客户端是编程 Agent 与大语言模型交互的核心组件,负责处理 API 调用、响应解析、错误处理等。

基础实现#

class LLMClient: """LLM 客户端"""

def init(self, config: LLMConfig): self.config = config self.api_key = config.api_key self.base_url = config.base_url self.model = config.model self.max_tokens = config.max_tokens self.temperature = config.temperature

会话管理

self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' })

缓存

self.cache = LRUCache(maxsize=1000)

统计

self.stats = { 'total_requests': 0, 'cache_hits': 0, 'errors': 0 }

async def complete(self, prompt: str, context: List[Dict] = None, **kwargs) -> str: """完成文本生成"""

检查缓存

cache_key = self._generate_cache_key(prompt, context, kwargs) cached_response = self.cache.get(cache_key) if cached_response: self.stats['cache_hits'] += 1 return cached_response

构建请求

messages = self._build_messages(prompt, context)

合并参数

params = { 'model': kwargs.get('model', self.model), 'messages': messages, 'max_tokens': kwargs.get('max_tokens', self.max_tokens), 'temperature': kwargs.get('temperature', self.temperature), **kwargs }

发送请求

try: response = await self._send_request(params) self.stats['total_requests'] += 1

解析响应

result = self._parse_response(response)

缓存结果

self.cache.set(cache_key, result)

return result

except Exception as e: self.stats['errors'] += 1

logger.error(f"LLM request failed: {e}") raise

async def _send_request(self, params: Dict) -> Dict: """发送请求""" url = f"{self.base_url}/chat/completions"

使用异步请求

async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: if response.status != 200: error_text = await response.text() raise Exception(f"API error: {response.status} - {error_text}")

return await response.json()

def _build_messages(self, prompt: str, context: List[Dict] = None) -> List[Dict]: """构建消息列表""" messages = []

添加系统提示

if self.config.system_prompt: messages.append({ 'role': 'system', 'content': self.config.system_prompt })

添加上下文

if context: messages.extend(context)

添加用户提示

messages.append({ 'role': 'user', 'content': prompt })

return messages

def _parse_response(self, response: Dict) -> str: """解析响应""" try: return response['choices'][0]['message']['content'] except (KeyError, IndexError) as e: raise Exception(f"Invalid response format: {e}")

def _generate_cache_key(self, prompt: str, context: List[Dict], kwargs: Dict) -> str: """生成缓存键""" key_data = { 'prompt': prompt, 'context': context, 'kwargs': kwargs } return hashlib.md5( json.dumps(key_data, sort_keys=True).encode() ).hexdigest()

def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return self.stats.copy()

流式响应支持#

bash
python

class StreamingLLMClient(LLMClient):
    """支持流式响应的 LLM 客户端"""

    async def complete_stream(self, prompt: str,
                            context: List[Dict] = None,
                            **kwargs) -> AsyncIterator[str]:
        """流式完成文本生成"""

        # 构建请求
        messages = self._build_messages(prompt, context)

        params = {
            'model': kwargs.get('model', self.model),
            'messages': messages,
            'max_tokens': kwargs.get('max_tokens', self.max_tokens),
            'temperature': kwargs.get('temperature', self.temperature),
            'stream': True,
            **kwargs
        }

        # 发送流式请求
        url = f"{self.base_url}/chat/completions"

        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=params) as response:
                if response.status != 200:
                    error_text = await response.text()
                    raise Exception(f"API error: {response.status} - {error_text}")

                # 处理流式响应
                async for line in response.content:
                    line = line.decode('utf-8').strip()

                    if not line or line == 'data: [DONE]':
                        continue

                    if line.startswith('data: '):
                        data = json.loads(line[6:])
                        try:
                            content = data['choices'][0]['delta']['content']
                            if content:
                                yield content
                        except (KeyError, IndexError):
                            continue

## 29.2.2 工具管理器

工具管理器负责注册、管理和执行各种工具,是编程 Agent 与外部系统交互的桥梁。

### 工具接口

class Tool(ABC):
"""工具基类"""
def __init__(self, tool_id: str, name: str, description: str):
self.id = tool_id
self.name = name
self.description = description
@abstractmethod
async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
"""执行工具"""
pass
@abstractmethod
def get_schema(self) -> Dict[str, Any]:
"""获取工具的参数模式"""
pass
def validate_parameters(self, parameters: Dict[str, Any]) -> bool:
"""验证参数"""
schema = self.get_schema()
return self._validate_against_schema(parameters, schema)
def _validate_against_schema(self, parameters: Dict,
schema: Dict) -> bool:
"""根据模式验证参数"""
required = schema.get('required', [])
properties = schema.get('properties', {})
# 检查必需参数
for param in required:
if param not in parameters:
return False
# 检查参数类型
for key, value in parameters.items():
if key in properties:
expected_type = properties[key].get('type')
if not self._check_type(value, expected_type):
return False
return True
def _check_type(self, value: Any, expected_type: str) -> bool:
"""检查类型"""
type_map = {
'string': str,
'number': (int, float),
'integer': int,
'boolean': bool,
'array': list,
'object': dict
}
expected_python_type = type_map.get(expected_type)
if expected_python_type is None:
return True
return isinstance(value, expected_python_type)

工具管理器实现#

bash
python

class ToolManager:
    """工具管理器"""

    def __init__(self):
        self.tools: Dict[str, Tool] = {}
        self.tool_categories: Dict[str, List[str]] = {}
        self.execution_history: List[ToolExecution] = []

    def register_tool(self, tool: Tool, category: str = None):
        """注册工具"""
        tool_id = tool.id

        if tool_id in self.tools:
            logger.warning(f"Tool already registered: {tool_id}")
            return

        self.tools[tool_id] = tool

        if category:
            if category not in self.tool_categories:
                self.tool_categories[category] = []
            self.tool_categories[category].append(tool_id)

        logger.info(f"Tool registered: {tool_id}")

    async def execute_tool(self, tool_id: str,
                           parameters: Dict[str, Any]) -> ToolResult:
        """执行工具"""
        tool = self.tools.get(tool_id)

        if not tool:
            raise ValueError(f"Tool not found: {tool_id}")

        # 验证参数
        if not tool.validate_parameters(parameters):
            raise ValueError("Invalid parameters")

        # 记录执行开始
        execution = ToolExecution(
            tool_id=tool_id,
            parameters=parameters,
            started_at=datetime.utcnow()
        )

        try:
            # 执行工具
            result = await tool.execute(parameters)

            # 记录执行结果
            execution.completed_at = datetime.utcnow()
            execution.success = True
            execution.result = result

            self.execution_history.append(execution)

            return result

        except Exception as e:
            # 记录执行失败
            execution.completed_at = datetime.utcnow()
            execution.success = False
            execution.error = str(e)

            self.execution_history.append(execution)

            logger.error(f"Tool execution failed: {e}")
            raise

    def get_tool(self, tool_id: str) -> Tool:
        """获取工具"""
        return self.tools.get(tool_id)

    def list_tools(self, category: str = None) -> List[Tool]:
        """列出工具"""
        if category:
            tool_ids = self.tool_categories.get(category, [])
            return [self.tools[tid] for tid in tool_ids]
        return list(self.tools.values())

    def get_tool_schema(self, tool_id: str) -> Dict[str, Any]:
        """获取工具模式"""
        tool = self.get_tool(tool_id)
        if tool:
            return tool.get_schema()
        return None

    def get_execution_history(self, tool_id: str = None,
                            limit: int = 100) -> List[ToolExecution]:
        """获取执行历史"""
        history = self.execution_history

        if tool_id:
            history = [e for e in history if e.tool_id == tool_id]

        return history[-limit:]

### 示例工具实现

class FileReadTool(Tool):
"""文件读取工具"""
def __init__(self):
super().__init__(
tool_id="file_read",
name="File Read",
description="Read the contents of a file"
)
async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
"""执行文件读取"""
file_path = parameters['file_path']
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return ToolResult(
success=True,
data={
'content': content,
'file_path': file_path
},
message=f"Successfully read file: {file_path}"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
message=f"Failed to read file: {file_path}"
)
def get_schema(self) -> Dict[str, Any]:
"""获取参数模式"""
return {
'type': 'object',
'properties': {
'file_path': {
'type': 'string',
'description': 'Path to the file to read'
}
},
'required': ['file_path']
}
class CodeExecuteTool(Tool):
"""代码执行工具"""
def __init__(self):
super().__init__(
tool_id="code_execute",
name="Code Execute",
description="Execute code and return the output"
)
async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
"""执行代码"""
code = parameters['code']
language = parameters.get('language', 'python')
try:
if language == 'python':
result = await self._execute_python(code)
else:
raise ValueError(f"Unsupported language: {language}")
return ToolResult(
success=True,
data={
'output': result['output'],
'error': result.get('error')
},
message="Code executed successfully"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
message="Code execution failed"
)
async def _execute_python(self, code: str) -> Dict[str, Any]:
"""执行 Python 代码"""
# 使用 subprocess 执行
process = await asyncio.create_subprocess_exec(
'python3',
'-c',
code,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return {
'output': stdout.decode('utf-8'),
'error': stderr.decode('utf-8') if stderr else None
}
def get_schema(self) -> Dict[str, Any]:
"""获取参数模式"""
return {
'type': 'object',
'properties': {
'code': {
'type': 'string',
'description': 'Code to execute'
},
'language': {
'type': 'string',
'description': 'Programming language',
'default': 'python'
}
},
'required': ['code']
}

29.2.3 记忆系统#

记忆系统负责存储和管理 Agent 的知识、经验和交互历史。

记忆系统架构#

bash
python

class MemorySystem:
    """记忆系统"""

    def __init__(self, config: MemoryConfig):
        self.config = config

        # 短期记忆(会话级)
        self.short_term_memory: Dict[str, SessionMemory] = {}

        # 长期记忆(持久化)
        self.long_term_memory = LongTermMemory(config.storage_config)

        # 向量存储(语义检索)
        self.vector_store = VectorStore(config.vector_config)

        # 知识图谱
        self.knowledge_graph = KnowledgeGraph(config.graph_config)

    async def store_interaction(self, request: UserRequest,
                                response: AgentResponse):
        """存储交互"""
        session_id = request.session_id

        # 获取或创建会话记忆
        if session_id not in self.short_term_memory:
            self.short_term_memory[session_id] = SessionMemory(
                session_id=session_id
            )

        session_memory = self.short_term_memory[session_id]

        # 存储交互
        interaction = Interaction(
            request=request,
            response=response,
            timestamp=datetime.utcnow()
        )

        session_memory.add_interaction(interaction)

        # 提取并存储知识
        await self._extract_and_store_knowledge(interaction)

    async def retrieve_context(self, session_id: str,
                              query: str = None) -> Context:
        """检索上下文"""
        context = Context()

        # 获取会话记忆
        session_memory = self.short_term_memory.get(session_id)
        if session_memory:
            context.interactions = session_memory.get_recent_interactions(
                limit=10
            )

        # 语义检索相关记忆
        if query:
            relevant_memories = await self.vector_store.search(
                query,
                top_k=5
            )
            context.relevant_memories = relevant_memories

        return context

    async def _extract_and_store_knowledge(self,
                                           interaction: Interaction):
        """提取并存储知识"""
        # 提取关键信息
        knowledge = await self._extract_knowledge(interaction)

        # 存储到向量存储
        for item in knowledge:
            await self.vector_store.add(
                id=item.id,
                text=item.text,
                metadata=item.metadata
            )

            # 更新知识图谱
            await self.knowledge_graph.add_node(
                id=item.id,
                type=item.type,
                properties=item.metadata
            )

    async def _extract_knowledge(self,
                                interaction: Interaction) -> List[KnowledgeItem]:
        """提取知识"""
        # 使用 LLM 提取知识
        prompt = f"""
        从以下交互中提取关键知识:

        用户请求:{interaction.request.text}
        Agent 响应:{interaction.response.text}

        请提取:

        """

        response = await self.llm_client.complete(prompt)

        # 解析提取的知识
        return self._parse_knowledge(response)

### 向量存储实现

class VectorStore:
"""向量存储"""
def __init__(self, config: VectorStoreConfig):
self.config = config
self.embedding_client = EmbeddingClient(config.embedding_config)
self.index = None
self.documents: Dict[str, Document] = {}
async def add(self, id: str, text: str, metadata: Dict = None):
"""添加文档"""
# 生成嵌入向量
embedding = await self.embedding_client.embed(text)
# 创建文档
document = Document(
id=id,
text=text,
embedding=embedding,
metadata=metadata or {}
)
self.documents[id] = document
# 更新索引
self._update_index()
async def search(self, query: str, top_k: int = 10) -> List[Document]:
"""搜索文档"""
# 生成查询向量
query_embedding = await self.embedding_client.embed(query)
# 搜索相似文档
if not self.index:
return []
distances, indices = self.index.search(
np.array([query_embedding]),
top_k
)
# 返回结果
results = []
doc_ids = list(self.documents.keys())
for i, idx in enumerate(indices[0]):
if idx >= 0 and idx < len(doc_ids):
doc_id = doc_ids[idx]
document = self.documents[doc_id]
document.similarity = 1.0 / (1.0 + distances[0][i])
results.append(document)
return results
def _update_index(self):
"""更新索引"""
if not self.documents:
return
# 构建向量矩阵
embeddings = np.array([
doc.embedding for doc in self.documents.values()
])
# 创建 FAISS 索引
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings)

29.2.4 任务规划器#

任务规划器负责将用户请求分解为可执行的任务序列。

bash
python

class TaskPlanner:
    """任务规划器"""

    def __init__(self):
        self.task_templates: Dict[str, TaskTemplate] = {}
        self.planning_strategies: Dict[str, PlanningStrategy] = {}

        # 注册默认策略
        self._register_default_strategies()

    async def plan(self, intent: Intent,
                  context: Dict[str, Any]) -> List[Task]:
        """规划任务"""

        # 选择规划策略
        strategy = self._select_strategy(intent, context)

        # 生成任务
        tasks = await strategy.generate_tasks(intent, context)

        # 优化任务顺序
        tasks = self._optimize_task_order(tasks)

        return tasks

    def _select_strategy(self, intent: Intent,
                        context: Dict) -> PlanningStrategy:
        """选择规划策略"""
        # 根据意图类型选择策略
        if intent.name == "code_generation":
            return self.planning_strategies.get("code_generation")
        elif intent.name == "code_analysis":
            return self.planning_strategies.get("code_analysis")
        elif intent.name == "debugging":
            return self.planning_strategies.get("debugging")
        else:
            return self.planning_strategies.get("default")

    def _optimize_task_order(self, tasks: List[Task]) -> List[Task]:
        """优化任务顺序"""
        # 构建依赖图
        dependency_graph = self._build_dependency_graph(tasks)

        # 拓扑排序
        return self._topological_sort(dependency_graph)

    def _build_dependency_graph(self,
                                tasks: List[Task]) -> Dict[str, List[str]]:
        """构建依赖图"""
        graph = {}
        for task in tasks:
            graph[task.id] = task.dependencies
        return graph

    def _topological_sort(self,
                         graph: Dict[str, List[str]]) -> List[Task]:
        """拓扑排序"""
        visited = set()
        result = []

        def visit(task_id: str):
            if task_id in visited:
                return

            visited.add(task_id)

            # 先访问依赖
            for dep_id in graph.get(task_id, []):
                visit(dep_id)

            result.append(task_id)

        for task_id in graph:
            visit(task_id)

        # 返回按顺序排列的任务
        task_map = {task.id: task for task in self.tasks}
        return [task_map[tid] for tid in result]

    def _register_default_strategies(self):
        """注册默认策略"""
        self.planning_strategies["code_generation"] = CodeGenerationStrategy()
        self.planning_strategies["code_analysis"] = CodeAnalysisStrategy()
        self.planning_strategies["debugging"] = DebuggingStrategy()
        self.planning_strategies["default"] = DefaultStrategy()

通过实现这些核心组件,我们可以构建一个功能完整的编程 Agent 基础框架。

标记本节教程为已读

记录您的学习进度,方便后续查看。